package com.qf.qfdrivermqtt.service;

import com.alibaba.fastjson.JSON;
import com.qf.common.bean.R;
import com.qf.common.constant.KafkaConstant;
import com.qf.common.dto.CmdParamDto;
import com.qf.qfdrivermqtt.gateways.CmdSender;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class CmdService {

    @Autowired
    private CmdSender cmdSender;

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public R send(CmdParamDto cmdParamDto) {

        String topic = "/qfjava/device/ctrl/" + cmdParamDto.getDeviceKey();

        String msg = JSON.toJSONString(cmdParamDto);

        cmdSender.send(topic,msg);

        //将发送的指令数据保存到mongodb中--发送消息到kafka
        String kafkaTopic = KafkaConstant.DEVICE_CMD_TOPIC;
        String value = msg;
        ProducerRecord record = new ProducerRecord(kafkaTopic,value);
        kafkaTemplate.send(record);

        return R.ok();
    }
}
